/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.extensions.sql.impl;



import com.google.auto.service.AutoService;

import com.bff.gaia.unified.sdk.extensions.sql.SqlTransform;

import com.bff.gaia.unified.sdk.extensions.sql.impl.planner.UnifiedRuleSets;

import com.bff.gaia.unified.sdk.extensions.sql.meta.provider.TableProvider;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import org.apache.calcite.avatica.AvaticaFactory;

import org.apache.calcite.jdbc.CalciteConnection;

import org.apache.calcite.jdbc.CalciteFactory;

import org.apache.calcite.jdbc.Driver;

import org.apache.calcite.plan.RelOptPlanner;

import org.apache.calcite.plan.RelOptRule;

import org.apache.calcite.plan.RelTraitDef;

import org.apache.calcite.prepare.CalcitePrepareImpl;

import org.apache.calcite.rel.RelCollationTraitDef;

import org.apache.calcite.rel.rules.CalcRemoveRule;

import org.apache.calcite.rel.rules.SortRemoveRule;

import org.apache.calcite.runtime.Hook;

import org.apache.calcite.tools.RuleSet;



import java.sql.Connection;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import java.util.function.Consumer;



import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;

import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory;



/**

 * Calcite JDBC driver with Unified defaults.

 *

 * <p>Connection URLs have this form:

 *

 * <p><code>jdbc:unified:param1=value1;param2=value2;param3=value3</code>

 *

 * <p>The querystring-style parameters are parsed as {@link PipelineOptions}.

 */

@AutoService(java.sql.Driver.class)

public class JdbcDriver extends Driver {

  public static final JdbcDriver INSTANCE = new JdbcDriver();

  public static final String CONNECT_STRING_PREFIX = "jdbc:unified:";

  static final String TOP_LEVEL_BEAM_SCHEMA = "unified";



  static {

    ClassLoader origLoader = Thread.currentThread().getContextClassLoader();

    try {

      Thread.currentThread().setContextClassLoader(JdbcDriver.class.getClassLoader());



      // init the compiler factory using correct class loader

      getDefaultCompilerFactory();

    } catch (Exception e) {

      throw new IllegalStateException(e);

    } finally {

      Thread.currentThread().setContextClassLoader(origLoader);

    }

    // inject unified rules into planner

    Hook.PLANNER.add(

        (Consumer<RelOptPlanner>)

            planner -> {

              for (RuleSet ruleSet : UnifiedRuleSets.getRuleSets()) {

                for (RelOptRule rule : ruleSet) {

                  planner.addRule(rule);

                }

              }

              planner.removeRule(CalcRemoveRule.INSTANCE);

              planner.removeRule(SortRemoveRule.INSTANCE);



              for (RelOptRule rule : CalcitePrepareImpl.ENUMERABLE_RULES) {

                planner.removeRule(rule);

              }



              List<RelTraitDef> relTraitDefs = new ArrayList<>(planner.getRelTraitDefs());

              planner.clearRelTraitDefs();

              for (RelTraitDef def : relTraitDefs) {

                if (!(def instanceof RelCollationTraitDef)) {

                  planner.addRelTraitDef(def);

                }

              }

            });

    // register JDBC driver

    INSTANCE.register();

  }



  @Override

  protected AvaticaFactory createFactory() {

    return JdbcFactory.wrap((CalciteFactory) super.createFactory());

  }



  @Override

  protected String getConnectStringPrefix() {

    return CONNECT_STRING_PREFIX;

  }



  /**

   * Configures Unified-specific options and opens a JDBC connection to Calcite.

   *

   * <p>If {@code originalConnectionProperties} doesn't have the Unified-specific properties, populates

   * them with defaults (e.g. sets the default schema name to "unified").

   *

   * <p>Returns null if {@code url} doesn't begin with {@link #CONNECT_STRING_PREFIX}. This seems to

   * be how JDBC decides whether a driver can handle a request. It tries to connect to it, and if

   * the result is null it picks another driver.

   *

   * <p>Returns an instance of {@link JdbcConnection} which is a Unified wrapper around {@link

   * CalciteConnection}.

   */

  @Override

  public Connection connect(String url, Properties info) throws SQLException {

    // calciteConnection is initialized with an empty Unified schema,

    // we need to populate it with pipeline options, load table providers, etc

    return JdbcConnection.initialize((CalciteConnection) super.connect(url, info));

  }



  /**

   * Connects to the driver using standard {@link #connect(String, Properties)} call, but overrides

   * the initial schema factory. Default factory would load up all table providers. The one

   * specified here doesn't load any providers. We then override the top-level schema with the

   * {@code tableProvider}.

   *

   * <p>This is called in tests and {@link UnifiedSqlEnv}, core part of {@link SqlTransform}. CLI uses

   * standard JDBC driver registry, and goes through {@link #connect(String, Properties)} instead,

   * not this path. The CLI ends up using the schema factory that populates the default schema with

   * all table providers it can find. See {@link UnifiedCalciteSchemaFactory}.

   */

  public static JdbcConnection connect(TableProvider tableProvider) {

    try {

      Properties properties = new Properties();

      properties.setProperty(

          SCHEMA_FACTORY.camelName(), UnifiedCalciteSchemaFactory.Empty.class.getName());

      JdbcConnection connection =

          (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);

      connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider);

      return connection;

    } catch (SQLException e) {

      throw new RuntimeException(e);

    }

  }

}